package com.shirley.reactiveEx;

import org.reactivestreams.Publisher;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

import java.util.List;

public class FluxTest {
    public static void main(String[] args) {
//        Flux.just(User.SKYLER)
//                .doOnRequest(a -> {System.out.println(a);})
//                .doOnNext(u -> System.out.println(u.getFirstname() +" "+ u.getLastname()))
//                .doFirst(() -> System.out.println("Starring:"))
//                .doOnComplete(() ->  System.out.println("The end!"));

        Flux.just(User.SKYLER)
                .onErrorResume(a -> Flux.just(User.SKYLER, User.SAUL));


    }


    // TODO Insert users contained in the Flux parameter in the blocking repository using a bounded elastic scheduler and return a Mono<Void> that signal the end of the operation
    Mono<Void> fluxToBlockingRepository(Flux<User> flux, BlockingRepository<User> repository) {
        return flux
                .publishOn(Schedulers.boundedElastic())
                .doOnNext(u -> {
                    repository.save(u);
                })
                .then();
    }

    // TODO Create a Flux for reading all users from the blocking repository deferred until the flux is subscribed, and run it with a bounded elastic scheduler
    Flux<User> blockingRepositoryToFlux(BlockingRepository<User> repository) {
        return Flux.defer(() -> Flux.fromIterable(repository.findAll()))
                .subscribeOn(Schedulers.boundedElastic());
    }

    // TODO Return the users contained in that Flux
    Iterable<User> fluxToValues(Flux<User> flux) {
        return flux.toIterable();
    }
    User monoToValue(Mono<User> mono) {
        return mono.block();
    }
    // TODO Convert the input Flux<User> to a Mono<List<User>> containing list of collected flux values
    Mono<List<User>> fluxCollection(Flux<User> flux) {
        return flux.collectList();
    }


    // TODO Return the same mono passed as input parameter, expect that it will emit User.SKYLER when empty
    Mono<User> emptyToSkyler(Mono<User> mono) {
        return mono.switchIfEmpty(Mono.just(User.SKYLER));
    }

    // TODO Return a valid Mono of user for null input and non null input user (hint: Reactive Streams do not accept null values)
    Mono<User> nullAwareUserToMono(User user) {
        return Mono.justOrEmpty(user);
    }

    // TODO Convert the input Flux<User> to a Mono<Void> that represents the complete signal of the flux
    Mono<Void> fluxCompletion(Flux<User> flux) {
        return flux//.ignoreElements()
                .then();
    }

    Mono<User> useFastestMono(Mono<User> mono1, Mono<User> mono2) {
        return Mono.firstWithValue(mono1, mono2);
    }
    // TODO Create a Flux of user from Flux of username, firstname and lastname.
    Flux<User> userFluxFromStringFlux(Flux<String> usernameFlux, Flux<String> firstnameFlux, Flux<String> lastnameFlux) {
        return Flux.zip((a) -> {
            return new User((String)a[0], (String)a[1], (String)a[2]);
            }, usernameFlux, firstnameFlux, lastnameFlux);
    }
    Flux<User> capitalizeMany(Flux<User> flux) {
        return flux.map(a -> {
            try {
                return capitalizeUser(a);
            } catch (GetOutOfHereException e) {
                throw Exceptions.propagate(e);
            }
        });
    }

    User capitalizeUser(User user) throws GetOutOfHereException {
        if (user.equals(User.SAUL)) {
            throw new GetOutOfHereException();
        }
        return new User(user.getUsername(), user.getFirstname(), user.getLastname());
    }

    protected final class GetOutOfHereException extends Exception {
        private static final long serialVersionUID = 0L;
    }
    Mono<User> asyncCapitalizeUser(User u) {
        return Mono.just(new User(u.getUsername().toUpperCase(), u.getFirstname().toUpperCase(), u.getLastname().toUpperCase()));
    }

    Mono<User> capitalizeOne(Mono<User> mono) {
        return mono.map(u -> new User(u.getUsername().substring(0, 1).toUpperCase()
                + u.getUsername().substring(1), u.getFirstname(), u.getLastname()));
    }

    public static class User {

        public static final User SKYLER = new User("swhite", "Skyler", "White");
        public static final User JESSE = new User("jpinkman", "Jesse", "Pinkman");
        public static final User WALTER = new User("wwhite", "Walter", "White");
        public static final User SAUL = new User("sgoodman", "Saul", "Goodman");

        private final String username;

        private final String firstname;

        private final String lastname;

        public User(String username, String firstname, String lastname) {
            this.username = username;
            this.firstname = firstname;
            this.lastname = lastname;
        }

        public String getUsername() {
            return username;
        }

        public String getFirstname() {
            return firstname;
        }

        public String getLastname() {
            return lastname;
        }

        @Override
        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || getClass() != o.getClass()) {
                return false;
            }

            User user = (User) o;

            if (!username.equals(user.username)) {
                return false;
            }
            if (!firstname.equals(user.firstname)) {
                return false;
            }
            return lastname.equals(user.lastname);

        }

        @Override
        public int hashCode() {
            int result = username.hashCode();
            result = 31 * result + firstname.hashCode();
            result = 31 * result + lastname.hashCode();
            return result;
        }

        @Override
        public String toString() {
            return "Person{" +
                    "username='" + username + '\'' +
                    ", firstname='" + firstname + '\'' +
                    ", lastname='" + lastname + '\'' +
                    '}';
        }
    }


    public interface ReactiveRepository<T> {

        Mono<Void> save(Publisher<T> publisher);

        Mono<T> findFirst();

        Flux<T> findAll();
        Iterable<T> findAllL();

        Mono<T> findById(String id);
    }

    public interface BlockingRepository<T> {

        void save(T entity);

        T findFirst();

        List<T> findAll();
        T findById(String id);
    }
}
